ROS2 Executor详解

本文由 ZiQingDream 编写,未经允许禁止转载。

本文作者:工程课代表[B站] / ZiQingDream[Github]

说明

为什么有这篇文章:

  1. 在使用ROS2编排话题、服务等,很少有人能够清楚、明确地说明ROS2调度逻辑;
  2. ROS2官网对于执行器和回调组的说明偏简单,可能产生理解偏差,本文尝试补充官网的细节;
  3. 多线程执行器误用是造成ROS2死锁、崩溃、通信效率低下问题根源。
    这篇文章从代码层面说明多种执行器差异,让阅读者对于执行器内部调度逻辑有比较清楚的认识,从而减少在机器人程序中出现的并发问题。

阅读本文需要你有一定ROS2基本理解,至少要有关于话题发布、订阅基础知识。

单线程执行器(SingleThreadedExecutor)

实验

发布程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//src/ros2/examples/rclcpp/topics/minimal_publisher/member_function.cpp
#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

using namespace std::chrono_literals;

// 从使用上来说,继承仅仅是为了简化代码操作,在ROS2整个设计体系下,
// 节点抽象更加独立和具体,作为类成员也很方便,不必非要写成继承
class MinimalPublisher : public rclcpp::Node
{
public:
MinimalPublisher()
: Node("minimal_publisher"), count_(0)
{
// 建立一个定时器,不断发送消息给订阅端
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
timer_ = this->create_wall_timer(
500ms, std::bind(&MinimalPublisher::timer_callback, this));
}

private:
void timer_callback()
{
auto message = std_msgs::msg::String();
message.data = "Hello, world! " + std::to_string(count_++);
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
publisher_->publish(message);
}

rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
size_t count_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalPublisher>());
rclcpp::shutdown();
return 0;
}

订阅程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//src/ros2/examples/rclcpp/topics/minimal_subscriber/member_function.cpp
#include <functional>
#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

using std::placeholders::_1;

class MinimalSubscriber : public rclcpp::Node
{
public:
MinimalSubscriber()
: Node("minimal_subscriber")
{
subscription_ = this->create_subscription<std_msgs::msg::String>(
"topic", 10, std::bind(&MinimalSubscriber::topic_callback, this, _1));
}

private:
void topic_callback(const std_msgs::msg::String & msg) const
{
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg.data.c_str());
}
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalSubscriber>());
rclcpp::shutdown();
return 0;
}

这两个程序是ros2官方提供example。我们现忽略对于话题和节点细节,后面逐步展开,重点关注消息调度,也就是这里的rclcpp::spin()函数。

全局spin()

如官网所述rclcpp::spin()函数其实就是SingleThreadedExecutor全局包装而已:

也就是说,上述的Publisher和Subscriber例子,都包含一个单线程执行器(rclcpp::executors::SingleThreadedExecutor),由该执行器调度ROS2消息:

1
2
3
4
5
6
7
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalPublisher>());
rclcpp::shutdown();
return 0;
}

代码基本等价于下方代码:新建一个单线程执行器,并把节点加入执行器。

1
2
3
4
5
6
7
8
9
10
11
12
13
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

auto node = std::make_shared<MinimalPublisher>();

rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
executor.spin();

rclcpp::shutdown();
return 0;
}

以上的结论,我们从humble代码中,可以清晰的了解到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// src/ros2/rclcpp/rclcpp/src/rclcpp/executors.cpp
void rclcpp::spin(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
rclcpp::ExecutorOptions options;
options.context = node_ptr->get_context();
// 建立一个单线程执行器
rclcpp::executors::SingleThreadedExecutor exec(options);
// 把节点加入到其中
exec.add_node(node_ptr);
// 进入调度线程
exec.spin();
exec.remove_node(node_ptr);
}

// 首先调用这个函数(非类成员),然后执行重载的rclcpp::spin()
// 参数rclcpp::Node::SharedPtr指针的版本
// 直接转发到rclcpp::node_interfaces::NodeBaseInterface::SharedPtr版本
void rclcpp::spin(rclcpp::Node::SharedPtr node_ptr)
{
rclcpp::spin(node_ptr->get_node_base_interface());
}

要非常注意,代码不能写成以下的形式,否则你会莫名其妙找不掉节点或话题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

rclcpp::executors::SingleThreadedExecutor executor;

// 注意这一行代码
// 在函数调用完成后,std::make_shared<MinimalPublisher>()返回的
// 临时变量就会被销毁
// !!! 不要这样写 !!!
executor.add_node(std::make_shared<MinimalPublisher>());

executor.spin(); // 直到这里才会阻塞

rclcpp::shutdown();
return 0;
}

解释起来不复杂,std::make_shared< MinimalPublisher >()创建的节点共享指针,executor.add_node()并没有保存,函数执行返回后,共享指针被销毁,导致节点和话题失效。

1
2
3
4
5
6
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
// Executor是SingleThreadedExecutor父类
void Executor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
{
this->add_node(node_ptr->get_node_base_interface(), notify);
}

那为什么rclcpp::spin()不会出现问题?原因是rclcpp::spin()会执行调度循环,一般不会退出来,共享指针也就被保留下来(我认为这是ros2代码缺陷,造成开发者有点精神分裂)。

单线程执行器spin()

让我们的关注点再回到单线程执行器上来,进入SingleThreadedExecutor::spin()看一看到底发生了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void SingleThreadedExecutor::spin()
{
// std::atomic_bool spinning;
// 原子变量,表示是否当前已经有人spin(),这部分防止多次spin():
// 1. 如果没调用过spin(): spinning = true,并返回false
// 2. 如果已经spin():spinning=true,并返回true,此时会抛出异常,程序退出
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}

// 这个不细说,这行代码在异常/函数退出时,将spinning=false,
// 主要让多线程内部正常退出,防止资源泄露
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

while (rclcpp::ok(this->context_) && spinning.load()) {
/*struct AnyExecutable
{
RCLCPP_PUBLIC
AnyExecutable();

RCLCPP_PUBLIC
virtual ~AnyExecutable();

// Only one of the following pointers will be set.
rclcpp::SubscriptionBase::SharedPtr subscription; // 话题订阅
rclcpp::TimerBase::SharedPtr timer; // 定时器
rclcpp::ServiceBase::SharedPtr service; // 服务端
rclcpp::ClientBase::SharedPtr client; // 客户端
rclcpp::Waitable::SharedPtr waitable;
// These are used to keep the scope on the containing items
rclcpp::CallbackGroup::SharedPtr callback_group;
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base;
std::shared_ptr<void> data;
}; */
rclcpp::AnyExecutable any_executable;

// 获取一个可调用的对象:话题订阅、定时器、服务回调、客户端回调、waitable
if (get_next_executable(any_executable)) {
// 如果获取成功,执行回调函数
execute_any_executable(any_executable);
}
}
}

整个过程就一个线程(也就是调用spin()的线程),循环尝试获取可调用对象,如果成功,就执行回调函数。这样加入到单线程执行器中节点的内部回调串行执行,流程图如下:

以上代码并没有说明底层发生了什么,只是强调了单线程执行器的串行执行。在ROS2 Humble官网文档中,有一张图简单说明了调度过程,这里补充下调度细节,以便大家更好理解后续多线程执行器的流程。Executors — ROS 2 Documentation: Humble documentation

  • 左图上方圆形表示回调函数,例如processOdom()是/odom话题的回调
  • 左图下方信封形状代表接收到数据,例如/goal有两个数据,/odom有三个数据,/cmd没有数据
  • 左图中间说明了调度的三个过程:
    1. wait:尝试等待底层(Middleware)数据到来。初始化wait_set,并根据底层数据情况,设置wait_set(wait_set可以简单理解成数组,每个数组元素表示队列里是否有可处理的数据,1表示有可处理数据,0表示没有数据)。图中/goal和/odom有消息,对应的掩码就是1,/cmd没有消息,对应的掩码就是0。所以wait_set就是101。这个过程在get_next_executable()函数中。
    2. take:会获取一个可执行对象,这个过程在get_next_executable()函数中。
    3. execute:执行回调函数,拿到可调用对象的线程,会直接调用回调函数。这个过程在execute_any_executable()函数中。

多线程执行器(MultiThreadedExecutor)

在进入多线程执行器前,先说明下为什么要用多线程执行器,要回答这个问题,先来看看单线程执行器有什么问题:

  1. 单线程执行器,所有回调都是串行执行,如果其中回调执行的工作比较多,其他回调必须等待,造成调度后延
  2. 即使回调执行很短,但如果你有很多回调需要服务,甚至需要他们同时执行,单线程执行器很难满足
  3. 如果你想在topic/service回调中,执行一个阻塞的操作(如发送另一个service请求并同步等待响应),你就会遇到程序卡死问题
    多线程执行器的引入,直接通过启动多个线程,带来回调执行上的并发,可以有效接触调用阻塞,提高节点的响应。当然,编程世界里没有银弹,多线程执行器同样带来很多隐含的特性,而且这些特性并不是很容易理解。我在这里先抛出几个问题,如果你不能清晰地回答,或许接下来分析会对你有所帮助:
  • 问题1:多线程执行器与单线程执行器有关系?执行器会启动多少线程?
  • 问题2:多线程执行器spin()和单线程spin()差异在哪里?
  • 问题3:什么是回调组?”Mutually Exclusive Callback Group”和”Reentrant Callback Group”区别是什么?
  • 问题4:多线程执行器并行调度的单元是什么,是节点吗?怎么做到回调并发执行?
  • 问题5:”Reentrant Callback Group”能否导致同一个节点的同一个话题的回调,被多个线程并发调用?

实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// src/ros2/examples/rclcpp/executors/multithreaded_executor/multithreaded_executor.cpp
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <thread>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

using namespace std::chrono_literals;

std::string string_thread_id()
{
auto hashed = std::hash<std::thread::id>()(std::this_thread::get_id());
return std::to_string(hashed);
}

// 发送话题节点
class PublisherNode : public rclcpp::Node
{
public:
PublisherNode()
: Node("PublisherNode"), count_(0)
{
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
auto timer_callback =
[this]() -> void {
auto message = std_msgs::msg::String();
message.data = "Hello World! " + std::to_string(this->count_++);

// Extract current thread
auto curr_thread = string_thread_id();

// 定时器每隔一段时间会发送自己线程ID
RCLCPP_INFO(
this->get_logger(), "\n<<THREAD %s>> Publishing '%s'",
curr_thread.c_str(), message.data.c_str());
this->publisher_->publish(message);
};
timer_ = this->create_wall_timer(500ms, timer_callback);
}

private:
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
size_t count_;
};

class DualThreadedNode : public rclcpp::Node
{
public:
DualThreadedNode()
: Node("DualThreadedNode")
{
/* These define the callback groups
* They don't really do much on their own, but they have to exist in order to
* assign callbacks to them. They're also what the executor looks for when trying to run multiple threads
*/
callback_group_subscriber1_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
callback_group_subscriber2_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);

// Each of these callback groups is basically a thread
// Everything assigned to one of them gets bundled into the same thread
auto sub1_opt = rclcpp::SubscriptionOptions();
sub1_opt.callback_group = callback_group_subscriber1_;
auto sub2_opt = rclcpp::SubscriptionOptions();
sub2_opt.callback_group = callback_group_subscriber2_;

subscription1_ = this->create_subscription<std_msgs::msg::String>(
"topic",
rclcpp::QoS(10),
// std::bind is sort of C++'s way of passing a function
// If you're used to function-passing, skip these comments
std::bind(
&DualThreadedNode::subscriber1_cb, // First parameter is a reference to the function
this, // What the function should be bound to
std::placeholders::_1), // At this point we're not positive of all the
// parameters being passed
// So we just put a generic placeholder
// into the binder
// (since we know we need ONE parameter)
sub1_opt); // This is where we set the callback group.
// This subscription will run with callback group subscriber1

subscription2_ = this->create_subscription<std_msgs::msg::String>(
"topic",
rclcpp::QoS(10),
std::bind(
&DualThreadedNode::subscriber2_cb,
this,
std::placeholders::_1),
sub2_opt);
}

private:
std::string timing_string()
{
rclcpp::Time time = this->now();
return std::to_string(time.nanoseconds());
}

/**
* Every time the Publisher publishes something, all subscribers to the topic get poked
* This function gets called when Subscriber1 is poked (due to the std::bind we used when defining it)
*/
void subscriber1_cb(const std_msgs::msg::String::ConstSharedPtr msg)
{
auto message_received_at = timing_string();

// Extract current thread
RCLCPP_INFO(
this->get_logger(), "THREAD %s => Heard '%s' at %s",
string_thread_id().c_str(), msg->data.c_str(), message_received_at.c_str());
}

/**
* This function gets called when Subscriber2 is poked
* Since it's running on a separate thread than Subscriber 1, it will run at (more-or-less) the same time!
*/
void subscriber2_cb(const std_msgs::msg::String::ConstSharedPtr msg)
{
auto message_received_at = timing_string();

// Prep display message
RCLCPP_INFO(
this->get_logger(), "THREAD %s => Heard '%s' at %s",
string_thread_id().c_str(), msg->data.c_str(), message_received_at.c_str());
}

rclcpp::CallbackGroup::SharedPtr callback_group_subscriber1_;
rclcpp::CallbackGroup::SharedPtr callback_group_subscriber2_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription1_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription2_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

// You MUST use the MultiThreadedExecutor to use, well, multiple threads
rclcpp::executors::MultiThreadedExecutor executor;
auto pubnode = std::make_shared<PublisherNode>();
auto subnode = std::make_shared<DualThreadedNode>();

executor.add_node(pubnode);
executor.add_node(subnode);
executor.spin();
rclcpp::shutdown();
return 0;
}

以上仍然是ROS2官方example。我们分析仍然从主函数开始,主函数定义了多线程调度器,并且同时加入了发布节点和接收节点,executor.spin()进入了调度循环,我们暂时忽略类内回调组,下面会分析。

执行器关系

从代码结构上看,多线程执行器和单线程执行器相似度很高,你甚至可以把这段代码中多线程调度器替换成单线程调度器,同样可以编译。这两种执行器都是继承同一父类,代码结构类似。

多线程执行器对象创建时,构造函数可以调整启动线程数目,而线程创建和执行在spin()中完成,多线程的引入,给并发地执行回调提供了可能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: rclcpp::Executor(options),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout)
{
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
if (number_of_threads_ == 0) {
number_of_threads_ = 1;
}
}

问题1:多线程执行器,与单线程执行器有关系?执行器会启动多少线程?
(1)单线程执行器和多线程执行器继承自同一个父类,具有相似的编码结构。
(2)在多线程执行器构造时,可以指定大于0的线程数目,或者默认线程数目”std::thread::hardware_concurrency()”,一般为机器CPU核心数。

多线程执行器spin()

虽然代码结构相似,但spin()行为却差异很大,多线程执行器启动多个线程,以便未来并发地执行回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//src/ros2/rclcpp/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
void MultiThreadedExecutor::spin()
{
// 这部分与单线程执行器一致,都是为了防止多次spin()
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
std::vector<std::thread> threads;
size_t thread_id = 0;

// 启动多个线程,值得注意的是:
// 1. 所有线程启动都在锁内保护,这个锁在run函数里也会用到
// 2. 注意这里少了一个线程,因为调用spin()函数的那个线程也要被使用,例如执行器开启8个线程:7个新线程+1个启动线程(启动7个线程的线程)
// 注:这里我也感觉是一个编程缺陷,作者可能想着同步所有线程,却把启动线程放在外边,可以考虑使用其他同步机制
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
}
}

// 启动线程也运行run()
run(thread_id);
for (auto & thread : threads) {
thread.join();
}
}

也就是说,多线程执行器在spin()中启动了MultiThreadedExecutor::number_of_threads_个线程,并发运行MultiThreadedExecutor::run()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//src/ros2/rclcpp/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
void MultiThreadedExecutor::run(size_t this_thread_number)
{
(void)this_thread_number;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
// !!! 注意这里,代码这里也加了wait_mutex_锁
{
std::lock_guard wait_lock{wait_mutex_};
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}

// 尝试获取一个可调用对象
// 这部分在wait_mutex_锁内,所以同时只有一个线程能进入
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
}
if (yield_before_execute_) {
std::this_thread::yield();
}

// 执行可调用对象
// 这部分在wait_mutex_锁外部,所以可以并发地执行
execute_any_executable(any_exec);

// 可调用对象已经无用了,删除
any_exec.callback_group.reset();
}
}

和单线程执行器类似,这里同样通过是get_next_executable()->execute_any_executable()。
注意代码包含wait_mutex_锁位置,查询并获取下一个可调用对象的函数get_next_executable(),不是并行执行,而且多线程间互斥执行的。执行可回调的函数execute_any_executable(),并没有wait_mutex_锁的保护,所以是可以并行执行的。

问题2:多线程执行器spin()和单线程spin()差异在哪里?
(1)多线程执行器的spin()启动多个线程,如果用户只启动一个线程,就退化到单线程执行器
(2)spin()互斥执行get_next_executable(),并发执行execute_any_executable(),给予回调并发执行能力

回调组

到这里你可能以为:所有的回调函数在多线程执行器调度下都是并发执行的,这个结论仍然是不充分的。
以上论述仅仅能说明:多线程执行器提供并行执行能力,但并没有说明并发执行的单元是谁?并发会不会由于某些限制,退化到串行执行?回调函数是不是会发生重入?等。要解释清楚这些,就要从回调组说起,先直接给出问题3-问题5的答案,然后再分析代码。
以下是官方的回调组定义(Using Callback Groups — ROS 2 Documentation: Humble documentation):

比较简单明了,这里总结一下回调组的差异:

问题3:什么是回调组?”Mutually Exclusive Callback Group”和”Reentrant Callback Group”区别是什么?
回调组是用来控制回调并发执行的回调集合(在多线程执行器有实际意义,因为单线程执行器本身就是串行的),从调度角度看,ROS2 Node可以理解为回调组的集合,节点包含多个回调组(节点默认会创建一个互斥回调组),回调组分为两种:
(1)互斥回调组(Mutually Exclusive Callback Group):不同回调组回调可并发地执行,同一个回调组的回调串行地执行;
(2)可重入回调组(Reentrant Callback Group):没有限制,不同回调组的回调可并发地执行,同一个回调组的回调也可并发地执行。

问题4:多线程执行器并行调度的单元是什么,是节点吗?怎么做到回调并行执行?
多线程执行器调度基本单元是回调函数。节点是回调组的集合,每个回调组可以包含多个回调,回调组通过设置不同的属性(互斥或重入)控制回调并发执行的行为。

在官网的说明,对于Reentrant Callback Group中有一句话:This means that, in addition to different callbacks being run parallel to each other, different instances of the same callback may also be executed concurrently.

问题5:”Reentrant Callback Group”能否导致同一个节点的同一个话题的回调,被多个线程并发调用?
可以,也就是说在多线程执行器中,如果消息或请求持续到来,并且有空闲线程,那么同一个回调会被并行执行。这是一个“危险的”特性,这要求函数可重入,个人推测这也是Reentrant Callback Group名字中Reentrant的由来。

大家对于以上的特性有了概念后,我们就能继续分析代码,并且说明ros2 humble怎么通过代码实现以上的特性。再次祭出官网的调度图(以下简称:调度图),只不过这一次调度被并行执行。

如之前单线程执行器中所述,整个过程核心就是:

  • 等待并获取可调用对象在Executor::get_next_executable()中,分别对应1)wait和2)take;
  • 执行可调用对象回调函数在Executor::execute_any_executable()中,对应3)execute。
    具体代码和注释如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
// 获取下一个可调用对象,我们暂时不考虑这里timeout,任务这里会持续等待
bool Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{
bool success = false;
// 检查是否有可调用的对象:话题、服务、定时器等
// 作者也在这里加入了TODO,因为这里是循环对比所有请求对象是否有效,所以在底层请求对象(如话题)超级多的情况下,会给CPU形成明显的负担
// TODO(wjwwood): improve run to run efficiency of this function
success = get_next_ready_executable(any_executable);
if (!success) {
// 如果没有获取成功,就尝试从底层等待消息的到来
wait_for_work(timeout);

// 确保spining持续进行,外部没有退出
if (!spinning.load()) {
return false;
}

// 忽略timeout的话,消息或请求应该是等到了,再次尝试是否能够拿到可调用对象
success = get_next_ready_executable(any_executable);
}
return success;
}

之前说过,get_next_executable()在多线程执行器中,是互斥执行的。也就是说同时只有一个线程会执行该函数,暂时不必担心该函数的线程安全问题。
wait_for_work()就是调度图中的1)wait,而get_next_ready_executable()函数就是调度图中的2)take。举个例子,假设这次没有拿到可调用对象,执行了wait_for_work()尝试更新wait_set,具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
void Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
{
std::lock_guard<std::mutex> guard(mutex_);

// 将执行器下的所有回调组,加入到Executor的map数据结构里,方便索引
add_callback_groups_from_nodes_associated_to_executor();

// 收集所有的准备消息订阅、服务、客户端、定时器、waitable,这些都以handles形式存储到memory_strategy_
// !!collect_entities中包含有一个group->can_be_taken_from().load(),可以防止正在执行的互斥回调组二次加入,后面细说
memory_strategy_->clear_handles();
bool has_invalid_weak_groups_or_nodes =
memory_strategy_->collect_entities(weak_groups_to_nodes_);

if (has_invalid_weak_groups_or_nodes) {
// 无效的group和node处理,不关心
//.....
}

// 清空wait_set,注意这里调用的是rcl(ROS Client Support Library)库,具体底层暂时不关心
rcl_ret_t ret = rcl_wait_set_clear(&wait_set_);
if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "Couldn't clear wait set");
}

// 根据收集的订阅、定时器等,重新申请wait_set内部数据结构内存
// 奇怪的是:这里传递的size参数都没用,直接从底层确定订阅、定时器等数量
ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "Couldn't resize the wait set");
}

// 把需要等待的订阅、定时器、客户端、服务端对象加入到待等待集合,并设置好rmw底层数据结构
// 函数会填充类似wait_set->impl->rmw_type.type[i]数据结构(一堆宏函数)
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) {
throw std::runtime_error("Couldn't fill wait set");
}
}

// 等待,内部调用了rmw_wait等待
// 函数会根据消息有无、定时器是否到期等,填充类似wait_set->impl->rmw_type.types[i]设置
rcl_ret_t status =
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (status == RCL_RET_WAIT_SET_EMPTY) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in rcl_wait(). This should never happen.");
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
using rclcpp::exceptions::throw_from_rcl_error;
throw_from_rcl_error(status, "rcl_wait() failed");
}

// check the null handles in the wait set and remove them from the handles in memory strategy
// for callback-based entities
std::lock_guard<std::mutex> guard(mutex_);
// wait_set_没有准备好的订阅、定时器等对象会是NULL,会在这里被从Executor的Handles数组清除,没有被清除的元素都是有效的请求
// 在后面做2)take的时候,直接对比的是Executor的Handles数据结构和节点中的订阅、定时器等,确定对象是否有效
memory_strategy_->remove_null_handles(&wait_set_);
}

上面代码大致:在收集完订阅、定时器、服务端、客户端、waitable对象后,初始化rmw底层数据结构,并调用rcl_wait()尝试等待(注意这里wait_set_具有复杂数据结构,并非简单数组)。
在rcl_wait()内部调用rmw_wait(),确定消息订阅有没有数据、定时器是否就绪等,并基于此更新wait_set_数据结构(调度图中wait_set=101,就是指/goal、/odom有数据,/cmd无数据)。然后拿到wait_set_后,Executor根据wait_set_更新自己的内部数据结构memory_strategy_,简单说就是从订阅、定时器等Handles数组中,抹去无数据的订阅、没有就绪定时器等(对应位置设置为NULL),方便之后2)take循环检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// src/ros2/rcl/rcl/src/rcl/wait.c
rcl_ret_t rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
{
// ... timeout等处理,跳过

// 尝试等待
rmw_ret_t ret = rmw_wait(
&wait_set->impl->rmw_subscriptions,
&wait_set->impl->rmw_guard_conditions,
&wait_set->impl->rmw_services,
&wait_set->impl->rmw_clients,
&wait_set->impl->rmw_events,
wait_set->impl->rmw_wait_set,
timeout_argument);

// 对于还没准备好的定时器、订阅,更新对应的handles数组,设置为NULL

size_t i;
for (i = 0; i < wait_set->impl->timer_index; ++i) {
if (!wait_set->timers[i]) {
continue;
}
bool is_ready = false;
rcl_ret_t ret = rcl_timer_is_ready(wait_set->timers[i], &is_ready);
if (ret != RCL_RET_OK) {
return ret; // The rcl error state should already be set.
}
if (!is_ready) {
wait_set->timers[i] = NULL;
}
}

//.....

// Set corresponding rcl subscription handles NULL.
for (i = 0; i < wait_set->size_of_subscriptions; ++i) {
bool is_ready = wait_set->impl->rmw_subscriptions.subscribers[i] != NULL;
if (!is_ready) {
wait_set->subscriptions[i] = NULL;
}
}

//....其他类似处理

return RCL_RET_OK;
}

memory_strategy_->remove_null_handles(&wait_set_);移除Handles元素过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/ros2/rclcpp/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
void remove_null_handles(rcl_wait_set_t * wait_set) override
{
for (size_t i = 0; i < subscription_handles_.size(); ++i) {
if (!wait_set->subscriptions[i]) {
subscription_handles_[i].reset();
}
}
// ......

subscription_handles_.erase(
std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr),
subscription_handles_.end()
);

// ......
}

我们已经获取了所有的准备好的消息、定时器等,紧接着就是2)take阶段,从其中选择一个可调用对象,官网上流程图展示了这个过程Executors — ROS 2 Documentation: Humble documentation

对应的具体代码如下,你可以在图和代码一一对应上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
bool Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
// 直接转发到get_next_ready_executable_from_map
bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_);
return success;
}

bool Executor::get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes)
{
TRACEPOINT(rclcpp_executor_get_next_ready);
bool success = false;
std::lock_guard<std::mutex> guard{mutex_};
// Check the timers to see if there are any that are ready
memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes);
if (any_executable.timer) {
success = true;
}
if (!success) {
// Check the subscriptions to see if there are any that are ready
memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes);
if (any_executable.subscription) {
success = true;
}
}
if (!success) {
// Check the services to see if there are any that are ready
memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes);
if (any_executable.service) {
success = true;
}
}
if (!success) {
// Check the clients to see if there are any that are ready
memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes);
if (any_executable.client) {
success = true;
}
}
if (!success) {
// Check the waitables to see if there are any that are ready
memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes);
if (any_executable.waitable) {
any_executable.data = any_executable.waitable->take_data();
success = true;
}
}
// At this point any_executable should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr
if (success) {
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group;
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
if (iter == weak_groups_to_nodes.end()) {
success = false;
}
}

if (success) {
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly ..Check if the callback_group belongs to this executor
if (any_executable.callback_group && any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise
assert(any_executable.callback_group->can_be_taken_from().load());
// Set to false to indicate something is being run from this group
// This is reset to true either when the any_executable is executed or when the
// any_executable is destructued
any_executable.callback_group->can_be_taken_from().store(false);
}
}
// If there is no ready executable, return false
return success;
}

这里我想说明三点:

  1. 显然这里的处理是有优先级的,优先定时器->订阅->服务->客户端->其他可调用对象,个人认为这种优先级可能在不当编程下造成部分请求回调的饥饿;
  2. 这里get_next_xxxx函数,为了解决有效性的问题,这里采用循环和当前回调组内部的话题对比,个人认为在大量话题的系统下,会显著增加CPU负担;
  3. 在take可调用对象的时候,也做了group->can_be_taken_from().load()判断,确定是否允许访问已经在执行回调组。
    以下是take一个订阅的过程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
void get_next_subscription(
rclcpp::AnyExecutable & any_exec,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
{
auto it = subscription_handles_.begin();
while (it != subscription_handles_.end()) {
auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes);
if (subscription) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription, weak_groups_to_nodes);
if (!group) {
// Group was not found, meaning the subscription is not valid...
// Remove it from the ready list and continue looking
it = subscription_handles_.erase(it);
continue;
}
// !!这里很重要,回调组的属性由这里体现
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec.subscription = subscription;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
subscription_handles_.erase(it);
return;
}
// Else, the subscription is no longer valid, remove it and continue
it = subscription_handles_.erase(it);
}
}

当我们拿到可调用的对象,剩下的就是执行可调用对象了。我们以execute_subscription()为例,如果有消息到来,则execute_subscription()会处理好序列化,并直接调用已经存储好的callback函数,细节不在这里赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
void Executor::execute_any_executable(AnyExecutable & any_exec)
{
if (!spinning.load()) {
return;
}
if (any_exec.timer) {
TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_timer(any_exec.timer);
}
if (any_exec.subscription) {
TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
execute_subscription(any_exec.subscription);
}
if (any_exec.service) {
execute_service(any_exec.service);
}
if (any_exec.client) {
execute_client(any_exec.client);
}
if (any_exec.waitable) {
any_exec.waitable->execute(any_exec.data);
}

// !!! 这里很重要
any_exec.callback_group->can_be_taken_from().store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition from execute_any_executable: ") + ex.what());
}
}

到这里问题4基本明白。问题3和问题5还没有被理清楚,因为代码跨度太大,为了弄清楚调度逻辑,我们需要跟踪can_be_taken_from(),才能说明并发的流程。为了方便说明,暂时把其他不重要的代码省略掉。仍然从run开始:

  • 整个调用流程是互斥的执行get_next_executable();
  • 并发的执行execute_any_executable()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// src/ros2/rclcpp/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
void MultiThreadedExecutor::run(size_t this_thread_number)
{
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
std::lock_guard wait_lock{wait_mutex_};
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
}

execute_any_executable(any_exec);
any_exec.callback_group.reset();
}
}

get_next_executable()是互斥执行的,当满足两个条件,那么can_be_taken_from_会设置为false,表示不允许再执行该回调组里的回调:

  • 成功take了一个可调用对象;
  • 可调用对象所属的回调组类型为CallbackGroupType::MutuallyExclusive。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp
bool Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{
bool success = get_next_ready_executable(any_executable);
if (!success) {
wait_for_work(timeout);
success = get_next_ready_executable(any_executable);
}

return success;
}

bool Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_);
return success;
}

bool Executor::get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &weak_groups_to_nodes)
{
TRACEPOINT(rclcpp_executor_get_next_ready);
bool success = false;
std::lock_guard<std::mutex> guard{mutex_};

//.......

if (success) {
if (any_executable.callback_group && any_executable.callback_group->type() \
== CallbackGroupType::MutuallyExclusive)
{
assert(any_executable.callback_group->can_be_taken_from().load());
any_executable.callback_group->can_be_taken_from().store(false);
}
}

return success;
}

并发的执行execute_any_executable()的最后,会将可调用对象所在回调组的can_be_taken_from_设置为true,表示该回调组的回调已经执行完毕,可以再次执行该回调了。

1
2
3
4
5
6
7
8
void Executor::execute_any_executable(AnyExecutable & any_exec)
{
if (any_exec.subscription) {
execute_subscription(any_exec.subscription);
}

any_exec.callback_group->can_be_taken_from().store(true);
}

根据can_be_taken_from_的值,确定回调是否能被执行的代码如下,程序在收集对象Handles、take的时候都有判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
{
//......
if (!group || !group->can_be_taken_from().load()) {
continue;
}
//......
return has_invalid_weak_groups_or_nodes;
}

void get_next_subscription(
rclcpp::AnyExecutable & any_exec,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
{
auto it = subscription_handles_.begin();
while (it != subscription_handles_.end()) {
auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes);
if (subscription) {
auto group = get_group_by_subscription(subscription, weak_groups_to_nodes);
if (!group->can_be_taken_from().load()) {
++it;
continue;
}

any_exec.subscription = subscription;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
subscription_handles_.erase(it);
return;
}

it = subscription_handles_.erase(it);
}
}

如果回调组类型为CallbackGroupType::Reentrant,那么就不会将can_be_taken_from_设置为false,回调组可以被持续调用。也就是说,如果有消息持续到来,并且有足够多线程,即使是同一个节点、同一个回调组的同一个函数,可能被多线程并行调用,这就要求在编程中,注意函数要支持可重入。(实验方法很简单,只需要改大QoS,假设发送端按照5Hz发送,订阅回调只需要sleep(1s),那么就会造成消息处理不过来,其他线程就会进入同一个订阅回调,处理消息)。


ROS2 Executor详解
https://ziqingdream.github.io/2025/04/13/-1-机器人技术栈/-1-ROS2操作系统/-1-ROS2-Humble代码阅读/【1】ROS2 Executor详解/
作者
工程课代表[B站] / ZiQingDream[Github]
发布于
2025年4月13日
许可协议